-
Notifications
You must be signed in to change notification settings - Fork 226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous lookups #498
Asynchronous lookups #498
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@petar Maymounkov thanks, the changes you made look pretty good. However, I think that now that we're not using mutexes it's a lot simpler to scrap all of the NumX
, IsX
, and MarkX
functions and replace them with len(GetX)
, GetState
and SetState
.
qpeerset/qpeerset.go
Outdated
type PeerState int | ||
|
||
const ( | ||
PeerSeen PeerState = iota | ||
PeerWaiting | ||
PeerQueried | ||
PeerUnreachable | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are this type and these consts exported? The only way they're utilized is by calling IsX(peer.ID)
and MarkX(peer.ID)
, we should probably either make these unexported or replace all the IsX/MarkX
functions with GetState(peer.ID) PeerState
and SetState(peer.ID, PeerState)
.
The Get and Set functions to me seem easier to manage than a whole set of functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Done.
query.go
Outdated
// closest nodes it has heard about. | ||
if len(peersToQuery) == 0 { | ||
select { | ||
case sawPeers := <-ch: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't just seen peers, maybe rename this variable and the populatePeers
function to something indicating that we're updating the states of the peers based on query responses. Perhaps something like response
and q.updatePeerState()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
query.go
Outdated
queryDone <- struct{}{} | ||
}() | ||
} | ||
|
||
loop: | ||
// wait for all the "d" disjoint queries to complete before we return | ||
// XXX: waiting until all queries are done is a security bug!!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving this comment in the code without context will probably be confusing. Perhaps put in a couple line explanation like:
While running disjoint path queries may protect users from malicious peers they are directly connected to (assuming the peer is not eclipsed) it still allows the malicious peers to cause you to make many queries since we wait until all paths have completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
qpeerset/qpeerset.go
Outdated
return | ||
} | ||
|
||
func (qp *QueryPeerset) GetClosestNotUnreachable(k int) (result []peer.ID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should probably be b
or beta
instead of k
since this isn't strictly about the bucket size (and we're passing in beta = 3 by default here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
fe4f7b0
to
36261f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is poetry.
query.go
Outdated
continue | ||
} | ||
q.queryPeers.TryAdd(p) | ||
q.queryPeers.MarkUnreachable(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've commented this elsewhere too:
We can simply remove the peer from queryPeers
here and not maintain this state at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons:
- (Main) The QueryPeerset data structure does no support fast removes.
- Remembering the unreached peers allows me to add richer breakpoint conditions in code, while debugging.
routing.go
Outdated
@@ -412,7 +411,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st | |||
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) { | |||
shortcutTaken := false | |||
for _, q := range queries { | |||
if q.localPeers.LenUnqueriedFromKClosest() > 0 { | |||
if len(q.queryPeers.GetClosestNotUnreachable(3)) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- It should be
Beta
, not 3. - Wont this always be true ? If we terminate because of
isLookupTermination()
, it will be because this was greater than zero.
dht_test.go
Outdated
for _, d := range dhts { | ||
d.RefreshRoutingTable() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Stebalien this is sort of interesting in that it shows that our new logic works on valid routing tables and not invalid/random tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All comments addressed.
qpeerset/qpeerset.go
Outdated
type PeerState int | ||
|
||
const ( | ||
PeerSeen PeerState = iota | ||
PeerWaiting | ||
PeerQueried | ||
PeerUnreachable | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Done.
query.go
Outdated
continue | ||
} | ||
q.queryPeers.TryAdd(p) | ||
q.queryPeers.MarkUnreachable(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two reasons:
- (Main) The QueryPeerset data structure does no support fast removes.
- Remembering the unreached peers allows me to add richer breakpoint conditions in code, while debugging.
4f2396a
to
0a01437
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a few updates. Summary:
- The query state for a peer used to be able to regress, now it can't
- We return a lookup result struct instead of the query structs themselves when completing a lookup operation.
- We ensure that the top k peers have been queried before a lookup returns since our higher level logic requires that invariant even though our query logic now only requires the top beta peers to have been queried (the other
k-beta
can just be peers we've heard about but have yet to connect to).
if len(out) < querier.beta { | ||
t.Fatalf("got wrong number of peers (got %d, expected at least %d)", len(out), querier.beta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is pretty strange but indicative of the changes we've been making. The test seems to setup a ring of peers and then calls GetClosestPeers
. In the past we were much more thorough in our queries and effectively ignored most of the benefits that can be assumed in a Kademlia network with fairly accurate routing tables. This meant we could have a ring of 30 peers and get 20 of them no problem.
Now, we more strongly rely on the routing tables being good and so in this ring setup it seems like we're only guaranteed to return > beta peers instead of k.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've basically broken the contract of GetClosestPeers
as I've commented elsewhere. There are a couple of things we can/should do to address these which have been mentioned in the comment.
@@ -399,26 +393,18 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st | |||
if err != nil { | |||
return | |||
} | |||
queriesCh <- queries | |||
lookupResCh <- lookupRes | |||
|
|||
if ctx.Err() == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we could bundle this check into the lookup.completed
variable, but haven't gotten around to it yet.
@@ -629,13 +615,13 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash | |||
|
|||
return peers, nil | |||
}, | |||
func(peerset *kpeerset.SortedPeerset) bool { | |||
func() bool { | |||
return ps.Size() >= count | |||
}, | |||
) | |||
|
|||
if err != nil && ctx.Err() == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we could bundle this check into the lookup.completed
variable, but haven't gotten around to it yet.
out <- p | ||
} | ||
|
||
if ctx.Err() == nil { | ||
if ctx.Err() == nil && lookupRes.completed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect we could bundle this check into the lookup.completed
variable, but haven't gotten around to it yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If ctx.Err != nil
, we set lookupRes.completed
to false. So, if lookupRes.completed
is true, we know the context has not errored.
Hence, we can remove that check. We can just call refreshRTIfNoShortcut
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to wait to coalesce the context errors until after disjoint paths are removed. I think it'll make ensuring correctness a lot easier.
…d as the Resiliency parameter.
…rom a lookup have all been queried even for beta < k. Modified the structure returned from lookups to be a useful subset of the full query state.
…account disjoint paths. Adjust test to take into account new query behavior (i.e. GetClosestPeers is no longer guaranteed to find k peers if the routing tables are invalid, e.g. ring setups)
72cd036
to
c9d93bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will push the tests once the PR is good to go.
Let me know if I've missed breaking changes in any of the API's. I think I've covered them all.
if p == q.dht.self { // don't add self. | ||
continue | ||
} | ||
q.queryPeers.TryAdd(p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just because you've heard about a peer again does not mean it's available. You could easily have the same unreachable peer in multiple RT's and so I'm not sure how helpful this would be.
If you choose to go ahead with it anyways, we should atleast wait for signed peer records as this opens up another attack vector otherwise.
|
||
return &queryResult{ | ||
foundCloserPeer: foundCloserPeer, | ||
func (q *query) updateState(up *queryUpdate) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a TryAdd
call ONLY for up.seen
. If the peer is in the queried
or unreachable
collection but NOT in the queryPeers
set, something has gone wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I left them in pending comments from @petar since he was the one who advocated for them being left in defensively. I'm inclined to agree that we should be more closely controlling state transitions and logging errors though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree here with both of you. We can remove TryAdd from queried and unreachable.
The comment I made that I think Adin is referring to was (in my mind) about the "if p == q.dht.self " checks.
@@ -669,7 +655,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, | |||
return pi, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aschmahmann We've changed the contract of FindPeer
. In Balsa(master), we don't mandate a peer to be connected for this API to return the address.
Now, we mandate connectivity. IMO, the API should not enforce this AND connectivity will anyways matter less once we have signed peer records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I agree that the API should not guarantee connectivity.
Are you referring to the return statement or to the stop function? For the return statement we could be a little more clever and separately track if any peer addresses are discovered for the peer during the target query, but this falls apart if we separately find the peer via another service while FindPeer is going on.
However, what's the abort condition? If the abort condition is just hearing about the peer then we could end up not being able to connect to the peer because we got a single stale address for them and then gave up.
Once we have signed peer records in place FindPeer should no longer require connectedness (although we could have it as an secondary abort condition). See libp2p/go-libp2p#784 for a little more background on FindPeer thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. The return statement:
// TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar
if dht.host.Network().Connectedness(id) == network.Connected {
return dht.peerstore.PeerInfo(id), nil
}
return peer.AddrInfo{}, routing.ErrNotFound
If we have a address for that peer in the peerstore, irrespective of whether we have a connection or not, we should return it. Even if we separately find the peer via another service while FindPeer is going on(low probability), there is no harm in doing this. I feel like mandating connectivity is too restrictive here.
2. Abort condition
If the abort condition is just hearing about the peer then we could end up not being able to connect to the peer because we got a single stale address for them and then gave up.
Sure. Signed peer records will solve this. Are you saying you want to wait till it goes live before you remove this? If yes, that's fine by me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 I modified this a bit lmk if it seems reasonable to you.
-
I ideally want to return routing.ErrNotFound whenever the DHT didn't cause you to find the records. Most people are probably doing FindPeer and then host.Connect, or peerstore.PeerInfo anyhow instead of actually using the records returned here. This is somewhat impractical currently so instead I've thrown in a few conditions that indicate you may have received valid peerinfo (we can only verify via connecting until signed peer records lands) from the DHT query. lmk if you have issues/concerns with my proposal.
-
I think we'll have to discuss this once the signed peer records land. I suspect what we'll end up with is similar to the proposals in FindPeer Design Review go-libp2p#784. Have a FindPeerAsync function that returns the records as we get them and perhaps abort early if we're connected since then we know the peerstore definitely has good addresses.
@@ -76,7 +75,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee | |||
e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Balsa: Return the K(bucket size) closest peers we KNOW are alive.
Cypress: Returns the K closest peers, but we KNOW ONLY atmost ANY Beta(default=3, no ordering) are alive.
We should either:
- Document this OR
- Make
Beta=K
OR - Allow users to pass in the number of closest peers they want alive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also applies by extension to Provide
& PutValue
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that this is tricky, especially for Provide
and PutValue
since they imply replication semantics. I'm less concerned about GetClosestPeers
since it has no inherent semantics and could just get a documentation change (also given it's lack of semantics I doubt it's in real use see here)
WDYT @petar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of the decision here was that even though the internals have changed the guarantees are similar and the nuances are both not something the consumer of the DHT should care about and are a bit much to just throw in as a documentation comment. If we want we can add comments to GetClosestPeers since it's really an implementation specific function (i.e. the contract is, and probably should be, tightly coupled to the implementation).
We are planning on rewriting the DHT spec once the new protocol version is released (or at least in the RC phase) and IMO detailing the specifics there or in a sister document would be excellent.
if len(out) < querier.beta { | ||
t.Fatalf("got wrong number of peers (got %d, expected at least %d)", len(out), querier.beta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've basically broken the contract of GetClosestPeers
as I've commented elsewhere. There are a couple of things we can/should do to address these which have been mentioned in the comment.
query.go
Outdated
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) { | ||
queryCtx, cancelQuery := context.WithCancel(ctx) | ||
type lookupResult struct { | ||
peers []peer.ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here what "peers" are those.
type lookupResult struct { | ||
peers []peer.ID | ||
state []qpeerset.PeerState | ||
completed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a comment here too, since the meaning of completed is subtle.
query.go
Outdated
numQueriesComplete := 0 | ||
queryDone := make(chan struct{}, d) | ||
// runLookup executes the lookup on the target using the given query function and stopping when either the context is | ||
// cancelled or the stop function returns true (if the stop function is not sticky it is not guaranteed to cause a stop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of saying "if the stop function is not sticky ...", I would say "the stop function MUST be sticky".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left this mostly as it is, the connectedness check we do during FindPeer is not sticky but we're mostly ok with that and I'm hoping to upgrade that logic a bit once we have signed peer records.
Tests for Async Query Logic
…strap logic now uses GetClosestPeers instead of FindPeer. FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful.
dialedPeerDuringQuery := false | ||
for i, p := range lookupRes.peers { | ||
if p == id { | ||
// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol | ||
// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT | ||
// server peer and is not identified as such is a bug. | ||
dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard | ||
break | ||
} | ||
} | ||
|
||
// TODO: Consider unlucky disconnect timing and potentially utilizing network.CanConnect or something similar | ||
if dht.host.Network().Connectedness(id) == network.Connected { | ||
// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected | ||
// to the peer. | ||
connectedness := dht.host.Network().Connectedness(id) | ||
if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect { | ||
return dht.peerstore.PeerInfo(id), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 just a heads up that I changed this around, going to do the squash (I'd like to make our lives easier when we merge cypress into master so it's not like 1000 commits). If we have any issues we'll just do a follow up
* feat(query): fully async implementation of Kademlia lookup. peers returned from the lookup are not guaranteed to be alive (i.e. we're only guaranteed to have dialed the closest beta peers to the target), but given stable and correct routing tables the expectation that most of the peers returned are alive is high. * feat(query): add wrapper lookup followup function to followup after the lookup is completed and ensure that the closest k returned peers from a lookup have been queried even for beta < k * refactor(query) modified the structure returned from lookups to be a useful subset of the full query state instead of the entire query state * feat(options): beta parameter exposed as the Resiliency parameter * feat(routing): do not mark the routing table as updated after a FindPeer query * feat(routing): FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup * feat(bootstrap): bootstrap logic now uses GetClosestPeers instead of FindPeer * refactor(dht): stopFn no longer takes any state * fix(test): changed GetClosestPeers test to only assume beta instead of k peers since that is now more appropriate given the query logic changes and that the routing tables in that test are bad, i.e. a ring network with arbitrary peerIDs Co-authored-by: Petar Maymounkov <petarm@gmail.com> Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
The reworked query logic follows Kademlia, but does so in a very non-optimal way since it queries peers in sequential chunks (i.e. alpha queries at a time). This PR makes this query process asynchronous (i.e. at most alpha outstanding queries at a time).